package main

import (
	"context"
	"fmt"
	"gitee.com/zackeus/go-boot/rocketmq"
	"gitee.com/zackeus/go-boot/rocketmq/consumer"
	"gitee.com/zackeus/go-boot/rocketmq/primitive"
	"gitee.com/zackeus/go-boot/rocketmq/rlog"
	"gitee.com/zackeus/go-zero/core/logx"
	"os"
	"time"
)

const (
	Topic         = "cti-event-topic"
	ConsumerGroup = "test-group"
	Endpoint      = "192.168.137.26:9876"
	AccessKey     = "testuser"
	SecretKey     = "yulon123"
)

func main() {
	if err := logx.SetUp(logx.LogConf{Mode: "console", Encoding: "plain"}); err != nil {
		fmt.Println(err)
		return
	}
	rlog.SetLevel("error")

	nameservers := []string{Endpoint}
	/* 消息轨迹追踪 */
	traceCfg := &primitive.TraceConfig{
		Access:   primitive.Local,
		Resolver: primitive.NewPassthroughResolver(nameservers),
	}

	sig := make(chan os.Signal)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(ConsumerGroup),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{Endpoint})),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithTrace(traceCfg),
		consumer.WithConsumeGoroutineNums(20),
		/* 顺序消费 每个 queue 一个线程 */
		consumer.WithConsumerOrder(true),
		/* 单次消费批次大小 */
		consumer.WithConsumeMessageBatchMaxSize(1),
		/* 最大重试次数 */
		consumer.WithMaxReconsumeTimes(3),
		/* 重试间隔 */
		consumer.WithSuspendCurrentQueueTimeMillis(time.Millisecond*1000),
		/* 鉴权消息 */
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: AccessKey,
			SecretKey: SecretKey,
		}),
	)
	if err := c.Subscribe(Topic, consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		//orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
		//fmt.Printf("orderly context: %v\n", orderlyCtx)
		//fmt.Println(fmt.Sprintf("receive: %s %s %s %s %s %s", msgs[0].Topic, msgs[0].MsgId, msgs[0].GetTags(), msgs[0].GetKeys(), msgs[0].Queue.QueueId, msgs[0].Body))
		//if msgs[0].Queue.QueueId%2 == 0 {
		//	fmt.Printf("msg: %v will rollback *********************\n", msgs[0].MsgId)
		//	/* 有序重试 */
		//	return consumer.SuspendCurrentQueueAMoment, nil
		//}

		for _, msg := range msgs {
			logx.Info(fmt.Sprintf("[%s]: [%s]", msg.GetTags(), msg.Body))
		}
		return consumer.ConsumeSuccess, nil
	}); err != nil {
		fmt.Println(err.Error())
	}

	// Note: start after subscribe
	if err := c.Start(); err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	<-sig
	if err := c.Shutdown(); err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}
